# Connectors: connect to data sources and sinks

A Feldera pipeline can process data from multiple heterogeneous sources and
produce outputs to multiple heterogeneous destinations. To this end it relies
on a growing library of **[input](sources)** and **[output](sinks)** connectors.

## Basics

A connector is a Feldera API object that describes an external data source or sink such as a database
table or a Kafka topic. Connectors work together with other API objects, namely programs and pipelines,
to build end-to-end streaming analytics pipelines.

* A [program](/docs#programs) defines a set of SQL tables and views.
* A [pipeline](/docs#pipelines) consists of a program with input and output connectors attached to it.
* A connector must explicitly be attached to a table (for input)
  or view (for output) of a pipeline.
* A connector can be attached to the tables/views of one or more pipelines,
  and can even be attached to multiple tables (for input) or views (for output)
  within the same pipeline.
* Connectors are managed through the API under the **/v0/connectors/...** endpoints.
  They can be retrieved
  [individually](https://www.feldera.com/api/fetch-a-connector-by-name) or
  [as a list](https://www.feldera.com/api/fetch-connectors-optionally-filtered-by-name-or-id),
  [created](https://www.feldera.com/api/create-a-new-connector),
  [overwritten](https://www.feldera.com/api/create-or-replace-a-connector),
  [modified](https://www.feldera.com/api/update-the-name-description-and-or-configuration-of-a-connector) and
  [deleted](https://www.feldera.com/api/delete-an-existing-connector).
* If changes are made to the connectors of a running pipeline, they will only be
  applied when the pipeline is restarted.

A connector specification consists of three parts:

* Generic attributes common to all connectors, such as backpressure thresholds.
* Transport specification, which defines the data transport to be used by the connector
  (e.g.,
  [Kafka](/docs/connectors/sources/kafka), 
  [URL](/docs/connectors/sources/http-get), 
  [Delta Lake](/docs/connectors/sources/delta), etc.) and its configuration.
* Data format specification, which defines the data format for the connector
  (e.g., 
  [CSV](/docs/api/csv),
  [JSON](/docs/api/json), 
  [Parquet](/docs/api/parquet), or 
  Avro), and its specific dialect.

This architecture allows the user to combine different transports and data formats.

> **Note:** these basics apply to all connectors **except** the
  HTTP [input](/docs/connectors/sources/http) and [output](/docs/connectors/sinks/http) connectors
  which are not managed by the user, as they directly feed/fetch data into/from a pipeline via
  dedicated pipeline endpoints.

## Configuring the output buffer

By default a Feldera pipeline sends a batch of changes to the output transport
for each batch of input updates it processes.  This can result in a stream of
small updates, which is normal and even preferable for output transports like
[Kafka](/docs/connectors/sinks/kafka); however it can cause performance problems
for other connectors, such as the [Delta Lake connector](/docs/connectors/sinks/delta)
by creating a large number of small files.

The output buffer mechanism is designed to solve this problem by decoupling the
rate at which the pipeline pushes changes to the output transport from the rate
of input changes.  It works by accumulating updates inside the pipeline
for up to a user-defined period of time or until accumulating a user-defined number
of updates and writing them as a single batch to the output transport.

The output buffer can be setup for each individual output connector as part
of connector configuration.  The following parameters are used to configure the
output buffer:

* `enable_output_buffer` - Enable output buffer.

* `max_output_buffer_time_millis` - Maximum time in milliseconds data is kept
   in the output buffer.

   When not specified, data is kept in the buffer indefinitely until one of
   the other trigger conditions is satisfied.  When this option is
   set the buffer will be flushed at most every
   `max_output_buffer_time_millis` milliseconds.

   NOTE: this configuration option requires the `enable_output_buffer` flag
   to be set.

* `max_output_buffer_size_records` - Maximum number of updates to be kept in
   the output buffer.

   This parameter bounds the maximal size of the buffer.  
   Note that the size of the buffer is not always equal to the
   total number of updates output by the pipeline. Updates to the
   same record can overwrite or cancel previous updates.

   When not specified, the buffer can grow indefinitely until one of
   the other trigger conditions is satisfied.

   NOTE: this configuration option requires the `enable_output_buffer` flag
   to be set.

:::note

When the `enable_output_buffer` flag is set, at least one of
`max_output_buffer_time_millis` or `max_output_buffer_size_records` must be
specified.

:::

See [Delta Lake output connector documentation](/docs/connectors/sinks/delta#curl)
for an example of configuring the output buffer.

## Example usage

### curl

#### Retrieve individual connector

Retrieve connector named `product-tools`.

```bash
curl -s -X GET http://localhost:8080/v0/connectors/product-tools | jq
```

#### Retrieve list of connectors
```bash
curl -s -X GET http://localhost:8080/v0/connectors | jq
```

#### Create connector

Create HTTP GET connector named `product-tools`.

```bash
curl -i -X PUT http://localhost:8080/v0/connectors/product-tools \
-H 'Content-Type: application/json' \
-d '{
  "description": "URL input connector for tools products",
  "config": {
      "transport": {
          "name": "url_input",
          "config": {
              "path": "https://example.com/tools-data.json"
          }
      },
      "format": {
          "name": "json",
          "config": {}
      }
  }
}'
```

#### Edit connector description
```bash
curl -i -X PATCH http://localhost:8080/v0/connectors/product-tools \
-H 'Content-Type: application/json' \
-d '{
  "description": "New description"
}'
```

#### Overwrite the entire connector (description and configuration)
```bash
curl -i -X PUT http://localhost:8080/v0/connectors/product-tools \
-H 'Content-Type: application/json' \
-d '{
  "description": "Another description",
  "config": {
      "transport": {
          "name": "url_input",
          "config": {
              "path": "https://example.com/new-tools-data.json"
          }
      },
      "format": {
          "name": "json",
          "config": {}
      }
  }
}'
```

#### Delete connector

Delete connector named `product-tools`.

```bash
curl -i -X DELETE http://localhost:8080/v0/connectors/product-tools
```

#### Attach input and output connector

A Feldera pipeline consists of an SQL program with input and output connectors attached to it.
The user lists connectors to be attached to the pipeline as part of the pipeline configuration.
In this example, we create pipeline `supply-chain-pipeline` with program `supply-chain-program`.
The pipeline attaches input connector `product-tools` to table `product`
and output connector `kafka-average-price` to view `average_price`.

```
curl -i -X PUT http://localhost:8080/v0/pipelines/supply-chain-pipeline \
-H 'Content-Type: application/json' \
-d '{
    "description": "Supply Chain pipeline",
    "program_name": "supply-chain-program",
    "config": {"workers": 4},
    "connectors": [
         {
             "connector_name": "product-tools",
             "is_input": true,
             "name": "product-tools",
             "relation_name": "product"
         },
         {
             "connector_name": "kafka-average-price",
             "is_input": false,
             "name": "kafka-average-price",
             "relation_name": "average_price"
         }
    ]
}'
```

### Python (direct API calls)

**Retrieve list of connectors while providing authorization:**
```python
import requests

api_url = "http://localhost:8080"
headers = { "authorization": f"Bearer <API-KEY>" }

for connector in requests.get(f"{api_url}/v0/connectors", headers=headers).json():
    print(connector)
```

## Additional resources

For more information, see:

* [REST API curl tutorial](/docs/tutorials/rest_api)

* [REST API documentation](https://www.feldera.com/api/create-a-new-connector) for connectors.
